{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-schedule-for-a-published-pipeline.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# How to Setup a Schedule for a Published Pipeline\n",
        "In this notebook, we will show you how you can run an already published pipeline on a schedule."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prerequisites and AML Basics\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc.\n",
        "\n",
        "### Initialization Steps"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "from azureml.core import Workspace\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)\n",
        "\n",
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Compute Targets\n",
        "#### Retrieve an already attached Azure Machine Learning Compute"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import AmlCompute, ComputeTarget\n",
        "from azureml.core.compute_target import ComputeTargetException\n",
        "aml_compute_target = \"cpu-cluster\"\n",
        "try:\n",
        "    aml_compute = AmlCompute(ws, aml_compute_target)\n",
        "    print(\"Found existing compute target: {}\".format(aml_compute_target))\n",
        "except ComputeTargetException:\n",
        "    print(\"Creating new compute target: {}\".format(aml_compute_target))\n",
        "    \n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
        "                                                                min_nodes = 1, \n",
        "                                                                max_nodes = 4)    \n",
        "    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n",
        "    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Build and Publish Pipeline\n",
        "Build a simple pipeline, publish it and add a schedule to run it."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Define a pipeline step\n",
        "Define a single step pipeline for demonstration purpose. The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "\n",
        "source_directory = \"publish_run_train\"\n",
        "\n",
        "trainStep = PythonScriptStep(\n",
        "    name=\"Training_Step\",\n",
        "    script_name=\"train.py\", \n",
        "    compute_target=aml_compute_target, \n",
        "    source_directory=source_directory\n",
        ")\n",
        "print(\"TrainStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Build the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Pipeline\n",
        "\n",
        "pipeline1 = Pipeline(workspace=ws, steps=[trainStep])\n",
        "print (\"Pipeline is built\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Publish the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from datetime import datetime\n",
        "\n",
        "timenow = datetime.now().strftime('%m-%d-%Y-%H-%M')\n",
        "\n",
        "pipeline_name = timenow + \"-Pipeline\"\n",
        "print(pipeline_name)\n",
        "\n",
        "published_pipeline1 = pipeline1.publish(\n",
        "    name=pipeline_name, \n",
        "    description=pipeline_name)\n",
        "print(\"Newly published pipeline id: {}\".format(published_pipeline1.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Schedule Operations\n",
        "Schedule operations require id of a published pipeline. You can get all published pipelines and do Schedule operations on them, or if you already know the id of the published pipeline, you can use it directly as well.\n",
        "### Get published pipeline ID"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PublishedPipeline\n",
        "\n",
        "# You could retrieve all pipelines that are published, or \n",
        "# just get the published pipeline object that you have the ID for.\n",
        "\n",
        "# Get all published pipeline objects in the workspace\n",
        "all_pub_pipelines = PublishedPipeline.list(ws)\n",
        "\n",
        "# We will iterate through the list of published pipelines and \n",
        "# use the last ID in the list for Schelue operations: \n",
        "print(\"Published pipelines found in the workspace:\")\n",
        "for pub_pipeline in all_pub_pipelines:\n",
        "    print(pub_pipeline.id)\n",
        "    pub_pipeline_id = pub_pipeline.id\n",
        "\n",
        "print(\"Published pipeline id to be used for Schedule operations: {}\".format(pub_pipeline_id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create a schedule for the pipeline using a recurrence\n",
        "This schedule will run on a specified recurrence interval."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule\n",
        "\n",
        "recurrence = ScheduleRecurrence(frequency=\"Day\", interval=2, hours=[22], minutes=[30]) # Runs every other day at 10:30pm\n",
        "\n",
        "schedule = Schedule.create(workspace=ws, name=\"My_Schedule\",\n",
        "                           pipeline_id=pub_pipeline_id, \n",
        "                           experiment_name='Schedule_Run',\n",
        "                           recurrence=recurrence,\n",
        "                           wait_for_provisioning=True,\n",
        "                           description=\"Schedule Run\")\n",
        "\n",
        "# You may want to make sure that the schedule is provisioned properly\n",
        "# before making any further changes to the schedule\n",
        "\n",
        "print(\"Created schedule with id: {}\".format(schedule.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Note: Set the `wait_for_provisioning` flag to False if you do not want to wait for the call to provision the schedule in the backend."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Get all schedules for a given pipeline\n",
        "Once you have the published pipeline ID, then you can get all schedules for that pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "schedules = Schedule.list(ws, pipeline_id=pub_pipeline_id)\n",
        "\n",
        "# We will iterate through the list of schedules and \n",
        "# use the last recurrence schedule in the list for further operations: \n",
        "print(\"Found these schedules for the pipeline id {}:\".format(pub_pipeline_id))\n",
        "for schedule in schedules: \n",
        "    print(schedule.id)\n",
        "    if schedule.recurrence is not None:\n",
        "        schedule_id = schedule.id\n",
        "\n",
        "print(\"Schedule id to be used for schedule operations: {}\".format(schedule_id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Get all schedules in your workspace\n",
        "You can also iterate through all schedules in your workspace if needed."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Use active_only=False to get all schedules including disabled schedules\n",
        "schedules = Schedule.list(ws, active_only=True) \n",
        "print(\"Your workspace has the following schedules set up:\")\n",
        "for schedule in schedules:\n",
        "    print(\"{} (Published pipeline: {}\".format(schedule.id, schedule.pipeline_id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Get the schedule"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "fetched_schedule = Schedule.get(ws, schedule_id)\n",
        "print(\"Using schedule with id: {}\".format(fetched_schedule.id))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Disable the schedule\n",
        "It is important to note the best practice of disabling schedules when not in use.\n",
        "The number of schedule triggers allowed per month per region per subscription is 100,000.\n",
        "This is calculated using the project trigger counts for all active schedules."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set the wait_for_provisioning flag to False if you do not want to wait  \n",
        "# for the call to provision the schedule in the backend.\n",
        "fetched_schedule.disable(wait_for_provisioning=True)\n",
        "fetched_schedule = Schedule.get(ws, schedule_id)\n",
        "print(\"Disabled schedule {}. New status is: {}\".format(fetched_schedule.id, fetched_schedule.status))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Reenable the schedule"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set the wait_for_provisioning flag to False if you do not want to wait  \n",
        "# for the call to provision the schedule in the backend.\n",
        "fetched_schedule.enable(wait_for_provisioning=True)\n",
        "fetched_schedule = Schedule.get(ws, schedule_id)\n",
        "print(\"Enabled schedule {}. New status is: {}\".format(fetched_schedule.id, fetched_schedule.status))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Change recurrence of the schedule"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set the wait_for_provisioning flag to False if you do not want to wait  \n",
        "# for the call to provision the schedule in the backend.\n",
        "recurrence = ScheduleRecurrence(frequency=\"Hour\", interval=2) # Runs every two hours\n",
        "\n",
        "fetched_schedule = Schedule.get(ws, schedule_id)\n",
        "\n",
        "fetched_schedule.update(name=\"My_Updated_Schedule\", \n",
        "                        description=\"Updated_Schedule_Run\", \n",
        "                        status='Active', \n",
        "                        wait_for_provisioning=True,\n",
        "                        recurrence=recurrence)\n",
        "\n",
        "fetched_schedule = Schedule.get(ws, fetched_schedule.id)\n",
        "\n",
        "print(\"Updated schedule:\", fetched_schedule.id, \n",
        "      \"\\nNew name:\", fetched_schedule.name,\n",
        "      \"\\nNew frequency:\", fetched_schedule.recurrence.frequency,\n",
        "      \"\\nNew status:\", fetched_schedule.status)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Create a schedule for the pipeline using a Datastore\n",
        "This schedule will run when additions or modifications are made to Blobs in the Datastore.\n",
        "By default, the Datastore container is monitored for changes. Use the path_on_datastore parameter to instead specify a path on the Datastore to monitor for changes. Note: the path_on_datastore will be under the container for the datastore, so the actual path monitored will be container/path_on_datastore. Changes made to subfolders in the container/path will not trigger the schedule.\n",
        "Note: Only Blob Datastores are supported."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.datastore import Datastore\n",
        "\n",
        "datastore = Datastore(workspace=ws, name=\"workspaceblobstore\")\n",
        "\n",
        "schedule = Schedule.create(workspace=ws, name=\"My_Schedule\",\n",
        "                           pipeline_id=pub_pipeline_id, \n",
        "                           experiment_name='Schedule_Run',\n",
        "                           datastore=datastore,\n",
        "                           wait_for_provisioning=True,\n",
        "                           description=\"Schedule Run\")\n",
        "                          #polling_interval=5, use polling_interval to specify how often to poll for blob additions/modifications. Default value is 5 minutes.\n",
        "                          #path_on_datastore=\"file/path\") use path_on_datastore to specify a specific folder to monitor for changes.\n",
        "\n",
        "# You may want to make sure that the schedule is provisioned properly\n",
        "# before making any further changes to the schedule\n",
        "\n",
        "print(\"Created schedule with id: {}\".format(schedule.id))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set the wait_for_provisioning flag to False if you do not want to wait  \n",
        "# for the call to provision the schedule in the backend.\n",
        "schedule.disable(wait_for_provisioning=True)\n",
        "schedule = Schedule.get(ws, schedule_id)\n",
        "print(\"Disabled schedule {}. New status is: {}\".format(schedule.id, schedule.status))"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "How to Setup a Schedule for a Published Pipeline",
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 10,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of Schedules for Published Pipelines"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}